Java多线程之AQS
Contents
队列同步器
队列同步器 AbstractQueuedSynchronizer ,是用来构建锁或者其他同步组件的基础框架,它使用了int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。
同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解,锁是面向使用者的,它定义了使用者于锁交互的接口,隐藏了实现细节,同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理,线程的排队,等待,唤醒等底层操作,锁与同步器很好的隔离了使用者和实现者所需要关注的领域。
AQS定义了一套多线程访问共享资源的同步器框架。许多同步类的实现都依赖它。
它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词。
state的访问方式有三种:
- getState()
- setState()
- compareAndSetState()
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
自定义同步器时需要重写的方法
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在底层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
总结一下来说,就是用getState() setState() compareAndSetState()
三个能获得/改变同步状态的方法来重写上面列出的五个常见的方法 就可以实现一个自定义的同步组件(已经有的同步组件就是ReentrantLock ,ReadWritelLock
等)
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
流程
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
Voliatile
为了保持可见性,prev ,next 这两个Node 类型的节点指针都是volatile 的,waitStatus(每个节点的等待状态也是), thread(获取同步状态的线程也是) 当然,同步状态status 肯定也是。
AQS 的实现分析
Acquire(int)
此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:
1 | public final void acquire(int arg) { |
函数流程包括:
1 tryAcquire() 尝试直接去获取资源,如果成功则直接返回。
2 addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式。
3 acquireQueued() 使线程在等待队列中获取资源,一直获取到资源后才返回,如果在整个等待过程中被中断过,则返回true,否则返回false。
4 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
如果直接获取到同步状态或者在队列中获取到就算获取到了同步状态
tryAcquire(int)
此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:
1 | protected boolean tryAcquire(int arg) { |
AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)!!!至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。
addWaiter(Node)
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。
1 | private Node addWaiter(Node mode) { |
Node结点是对每一个访问同步代码的线程的封装,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。变量waitStatus则表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。
- CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
- SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
- CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
- 0状态:值为0,代表初始化状态。
AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。
enq(Node)
此方法用于将node 加入队尾。
1 | private Node enq(final Node node) { |
acquireQueued(Node,int)
OK,通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。聪明的你立刻应该能想到该线程下一部该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。没错,就是这样!是不是跟医院排队拿号有点相似~~acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。
节点进入同步队列后,就进入了一个自旋的状态,每个节点(每个线程)都在自省的观察,当条件满足(这里的条件就是前驱是头节点且拿到了同步状态,见下面感叹号标识的那一行,),获取到了同步状态,就可以从这个自旋状态中退出。否则依然保留在这个自旋状态中(并阻塞这个节点表示的线程,也就是说,每个节点都会自旋,但是只有发现自己的前驱是头节点且获取同步状态后才会从自旋状态出退出,通过LockSupport.unpark()这个方法来唤醒阻塞的节点。其他前驱不是头结点的仍然一直阻塞在自旋状态中,并且,先知道自己的前驱是头结点,才会尝试获取同步状态)
1 | final boolean acquireQueued(final Node node, int arg) { |
- 结点进入队尾后,检查状态,找到安全休息点;
- 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
- 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。
重入锁的实现原理
重入锁的非公平方法增加了再次获取同步状态的逻辑,非重入锁只要status==1 表示获取锁,重入锁通过判断当前线程是否为获取锁的线程来决定获取操作能否成功。如果获取锁的线程再次请求,则将同步状态值进行增加并返回ture, 表示获取同步状态成功。
如果是公平锁,还应该在判断条件那加上hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早的请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。
读写锁的实现原理
读写锁同样依赖AQS, 读写锁将变量分成了两个部分,高16位表示读,低16位表示写,读写锁通过位运算快速的确定读和写各自的状态,
写锁是一个支持重进入的排它锁,如果当前线程已经获取了写锁,则增加写状态,如果当前线程在获取写锁时,读锁已经被获取或者改线程不是已经获取写锁的线程,则当前线程进入等待状态。
写锁也有可能锁降级,锁降级指的是把持住写锁,再获取到读锁,随后释放写锁的过程。
Author: corn1ng
Link: https://corn1ng.github.io/2018/01/19/Java-多线程(四) AQS/
License: 知识共享署名-非商业性使用 4.0 国际许可协议